package com.example.dobs.demo.flink.realtime.report;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.twitter.chill.protobuf.ProtobufSerializer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.example.dobs.demo.flink.realtime.report.bean.Log;
import com.example.dobs.demo.flink.realtime.report.utils.MyProcessFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

import static com.example.dobs.demo.flink.realtime.report.utils.LogParse.*;
import static com.example.dobs.demo.flink.realtime.report.utils.MyUtils.*;

public class RealtimeReport {
    public final static String blankString = "";

    public static void main(String[] args) throws Exception {
        Config config = ConfigFactory.load("adx_report.conf").getConfig("prod").getConfig("adx_report_job");
        int timeToLiveMinutes = config.getInt("time-to-live-minutes");
        String checkpointDir = config.getString("checkpoint-dir");
        int kafkaParallelism = 8;//线上4，测试2
        int sinkKafkaParallelism = 8;//线上8，测试2
        long delay = 30000L;

        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

        // checkpoint
        see.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        see.enableCheckpointing(60000);
        see.setStateBackend(new FsStateBackend(checkpointDir));

        see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//时间类型
//        see.getConfig().setAutoWatermarkInterval(5000);//

        see.getConfig().registerTypeWithKryoSerializer(Log.Event.class, ProtobufSerializer.class);

        // kafka设置
        String sourceServers = config.getConfig("source-kafka").getString("bootstrap-servers");
        String sourceTopic = config.getConfig("source-kafka").getString("topic");
        String sourceGroup = config.getConfig("source-kafka").getString("consumer-group");

        String sinkServers = config.getConfig("sink-kafka").getString("bootstrap-servers");
        String sinkTopic = config.getConfig("sink-kafka").getString("topic");
        Properties sourceKafkaProps = new Properties();
        sourceKafkaProps.setProperty("bootstrap.servers", sourceServers);
        sourceKafkaProps.setProperty("group.id", sourceGroup);
        Properties sinkKafkaProps = new Properties();
        sinkKafkaProps.setProperty("bootstrap.servers", sinkServers);

        FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>(sourceTopic, new SimpleStringSchema(), sourceKafkaProps);

        source.setStartFromLatest();//从最新的kafka消费数据


        SingleOutputStreamOperator<Tuple2<String, Log.Event>> stream = see.addSource(source).setParallelism(kafkaParallelism)
                .flatMap((FlatMapFunction<String, Tuple2<String, Log.Event>>) (value, out) -> {
                    JSONObject eventJson;
                    try {//解析json,无法解析时,跳过
                        String message = JSON.parseObject(value).getString("message");
                        String logStr = message.split("\t")[0];
                        String timestamp = message.split("\t")[1];
                        eventJson = JSON.parseObject(logStr);
//                        eventJson.getJSONObject("params");
                        String event_name = eventJson.getString("event_name");
                        if ("AD_AdxRequest".equals(event_name)) {
                            requestLogParser(eventJson, out,timestamp);
                        } else if ("AD_AdxShowed".equals(event_name)) {
                            impressionLogParser(eventJson, out, timestamp);
                        } else if ("AD_AdxClicked".equals(event_name)) {
                            clickLogParser(eventJson, out, timestamp);
                        } else {
//                            System.out.println("[ERROR]---debug2---" + value);
                        }
                    } catch (Exception e) {//pass
//                        System.out.println("[ERROR]---debug1---" + value);
                    }
                }).setParallelism(kafkaParallelism).returns(TypeInformation.of(new TypeHint<Tuple2<String, Log.Event>>() {
                }))
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Log.Event>>(Time.milliseconds(delay)) {
                    @Override
                    public long extractTimestamp(Tuple2<String, Log.Event> element) {
                        return element.f1.getTs();
                    }
                }).returns(TypeInformation.of(new TypeHint<Tuple2<String, Log.Event>>() {
                }))//水位需要设置!!
                .setParallelism(kafkaParallelism).name("adx_report_water_mark");


        //5分聚合一次dsp+rid+pid
        SingleOutputStreamOperator<Log.Event> keyByStream = stream.keyBy(new KeySelector<Tuple2<String, Log.Event>, String>() {
            @Override
            public String getKey(Tuple2<String, Log.Event> value) throws Exception {
                return value.f1.getDsp() + value.f1.getRid() + value.f1.getPid();
            }
        }).process(new MyProcessFunction()).setParallelism(kafkaParallelism*2).name("keyByStream");

        // 再次聚合，10w条1次
        SingleOutputStreamOperator<Log.Event> final_join = keyByStream.keyBy(new KeySelector<Log.Event, String>() {
            @Override
            public String getKey(Log.Event value) throws Exception {
                return getTableDimension(value);
            }
        }).countWindow(100000).reduce(new ReduceFunction<Log.Event>() {
            @Override
            public Log.Event reduce(Log.Event value1, Log.Event value2) throws Exception {
                return eventMerge(value1,value2);
            }
        }).setParallelism(sinkKafkaParallelism).name("final join");

        //输出
        final_join.map(new MapFunction<Log.Event, String>() {
            @Override
            public String map(Log.Event value) throws Exception {
                return getResultJson(value);
            }
        }).setParallelism(sinkKafkaParallelism).addSink(new FlinkKafkaProducer<String>(sinkTopic, new

                SimpleStringSchema(), sinkKafkaProps)).

                name("adx_report_sink").setParallelism(sinkKafkaParallelism);
        see.execute("ads_adx_report_job");
    }



}
